package collector

import (
	"dragonfly/common/config"
	"dragonfly/common/log"
	"dragonfly/common/tcp"
	"dragonfly/common/types"
	"fmt"
	"time"

	"github.com/rs/zerolog"
)

type Collector struct {
	*tcp.AsyncTCPServer
	hostinfo   *types.Host
	log        *zerolog.Logger
	agentQueue *ConnPool
}

var collector Collector

func InitCollector(cfg *config.Config) error {
	protocol := &tcp.DefaultProtocol{}
	protocol.SetMaxPacketSize(0)
	s := tcp.NewAsyncTCPServer(
		*cfg.CollectorSettings.ListenAddress,
		&callback{},
		protocol,
	)

	collector = Collector{
		s,
		&types.Host{Name: "collector"},
		log.NewLogger(&cfg.LogSettings),
		&ConnPool{},
	}

	collector.log.Info().Msgf("Listening on %s", *cfg.CollectorSettings.ListenAddress)
	return collector.ListenAndServe()
}

func InitAgentPool(c *Collector) error {
	agentPool := []string{"127.0.0.1:10001"}

	connPool := NewConnPool(c, agentPool)
	if len(connPool.pool) < 1 {
		return fmt.Errorf("Agent connection pool is null")
	}
	c.agentQueue = connPool
	return nil
}

func (this *Collector) getHeartbeat() {
	heartbeat_t := time.NewTicker(time.Second * 2)
	pkt := tcp.NewDefaultPacket(
		types.MsgGetHeartbeat,
		this.hostinfo.Encode(),
	)
	for addr, agentConn := range this.agentQueue.pool {
		go func(a string, conn *tcp.TCPConn) {
			this.log.Debug().Msgf("get heartbeat from agent %s", a)
			if err := conn.AsyncWritePacket(pkt); err != nil {
				this.log.Error().Msg("get heartbeat failed")
			}

			for {
				select {
				case <-heartbeat_t.C:
					if err := conn.AsyncWritePacket(pkt); err != nil {
						this.log.Error().Msg("get heartbeat failed")
					}
				}
			}
		}(addr, agentConn)
	}
}

func (this *Collector) StartTask() {
	go this.getHeartbeat()
}
